In [1]:
%run startup.py

In [2]:
%%javascript
$.getScript('./assets/js/ipython_notebook_toc.js')


A Decision Tree of Observable Operators

Part 5: Consolidating Streams

source: http://reactivex.io/documentation/operators.html#tree.
(transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, axiros)

This tree can help you find the ReactiveX Observable operator you’re looking for.
See Part 1 for Usage and Output Instructions.

We also require acquaintance with the marble diagrams feature of RxPy.

Table of Contents

I want to reemit only certain items from an Observable

... by filtering out those that do not match some predicate filter/where


In [1]:
reset_start_time(O.filter) # alias: where
d = subs(O.range(0, 5).filter(lambda x, i: x % 2 == 0))


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-1-c3481d609448> in <module>()
----> 1 reset_start_time(O.filter) # alias: where
      2 d = subs(O.range(0, 5).filter(lambda x, i: x % 2 == 0))

NameError: name 'reset_start_time' is not defined

... by slicing slice


In [3]:
reset_start_time(O.slice)
s = marble_stream('r-e-a-c-t-i-v-e-|')
d = subs(s.slice(5, 10))
sleep(1)
# start stop step:
d = subs(s.slice(1, -1, 2))



========== slice_ ==========

module rx.linq.observable.slice
@extensionmethod(Observable, name="slice")
def slice_(self, start=None, stop=None, step=1):
    Slices the given observable. It is basically a wrapper around the
    operators skip(), skip_last(), take(), take_last() and filter().

    This marble diagram helps you remember how slices works with streams.
    Positive numbers is relative to the start of the events, while negative
    numbers are relative to the end (on_completed) of the stream.

    r---e---a---c---t---i---v---e---|
    0   1   2   3   4   5   6   7   8
   -8  -7  -6  -5  -4  -3  -2  -1

    Example:
    result = source.slice(1, 10)
    result = source.slice(1, -2)
    result = source.slice(1, -1, 2)

    Keyword arguments:
    :param Observable self: Observable to slice
    :param int start: Number of elements to skip of take last
    :param int stop: Last element to take of skip last
    :param int step: Takes every step element. Must be larger than zero

    :returns: Returns a sliced observable sequence.
    :rtype: Observable
--------------------------------------------------------------------------------

   3.2     M New subscription on stream 275034569
 571.3   T13 [next]  568.0: i
 681.0   T15 [next]  677.6: v
 788.2   T17 [next]  784.8: e
 898.9   T19 [cmpl]  895.6: fin

1009.3     M New subscription on stream 276459417
1245.7   T25 [next]  236.2: e
1465.8   T30 [next]  456.3: c
1686.8   T33 [next]  677.3: i
1910.5   T37 [cmpl]  901.0: fin

... that is, only the first item first


In [27]:
rst(O.first)
# match on index:
d = subs(O.from_((1, 2 ,3)).first(lambda x, i: i==1))



========== first ==========

module rx.linq.observable.first
@extensionmethod(ObservableBase)
def first(self, predicate=None):
    Returns the first element of an observable sequence that satisfies
    the condition in the predicate if present else the first item in the
    sequence.

    Example:
    res = res = source.first()
    res = res = source.first(lambda x: x > 3)

    Keyword arguments:
    predicate -- {Function} [Optional] A predicate function to evaluate for
        elements in the source sequence.

    Returns {Observable} Sequence containing the first element in the
    observable sequence that satisfies the condition in the predicate if
    provided, else the first item in the sequence.
--------------------------------------------------------------------------------

   2.2     M New subscription on stream 276538769
   2.9     M [next]    0.6: 2
   3.0     M [cmpl]    0.7: fin

...that is, only the first items take, take_with_time


In [28]:
rst(O.take)
d = subs(O.from_((1, 2, 3, 4)).take(2))
rst(O.take_with_time)
d = subs(marble_stream('1-2-3-4|').take_with_time(200))



========== take ==========

module rx.linq.observable.take
@extensionmethod(ObservableBase)
def take(self, count, scheduler=None):
    Returns a specified number of contiguous elements from the start of
    an observable sequence, using the specified scheduler for the edge case
    of take(0).

    1 - source.take(5)
    2 - source.take(0, rx.Scheduler.timeout)

    Keyword arguments:
    count -- The number of elements to return.
    scheduler -- [Optional] Scheduler used to produce an OnCompleted
        message in case count is set to 0.

    Returns an observable sequence that contains the specified number of
    elements from the start of the input sequence.
--------------------------------------------------------------------------------

   3.0     M New subscription on stream 276541045
   3.6     M [next]    0.5: 1
   3.8     M [next]    0.7: 2
   4.0     M [cmpl]    0.9: fin


========== take_with_time ==========

module rx.linq.observable.takewithtime
@extensionmethod(ObservableBase)
def take_with_time(self, duration, scheduler=None):
    Takes elements for the specified duration from the start of the
    observable source sequence, using the specified scheduler to run timers.

    Example:
    res = source.take_with_time(5000,  [optional scheduler])

    Description:
    This operator accumulates a queue with a length enough to store elements
    received during the initial duration window. As more elements are
    received, elements older than the specified duration are taken from the
    queue and produced on the result sequence. This causes elements to be
    delayed with duration.

    Keyword arguments:
    duration -- {Number} Duration for taking elements from the start of the
        sequence.
    scheduler -- {Scheduler} Scheduler to run the timer on. If not
        specified, defaults to rx.Scheduler.timeout.

    Returns {Observable} An observable sequence with the elements taken
    during the specified duration from the start of the source sequence.
--------------------------------------------------------------------------------

   1.4     M New subscription on stream 276540693
  13.6  T150 [next]   12.1: 1
 123.5  T151 [next]  122.0: 2
 204.9  T149 [cmpl]  203.4: fin

... that is, only the last item last, last_or_default, take_last


In [29]:
rst(O.last,  title=True)
d = subs(O.from_((1, 2, 3)).last(lambda x: x < 3))
rst(O.last_or_default,  title=True)
d = subs(O.from_((1, 2, 3)).last_or_default(lambda x: x > 3))
d = subs(O.from_((1, 2, 3)).last_or_default(lambda x: x > 3, '42'))

rst(O.take_last, title=True)
d = subs(O.from_((1, 2, 3, 4)).take_last(2))



========== last ==========

module rx.linq.observable.last
@extensionmethod(ObservableBase)
def last(self, predicate=None):
    Returns the last element of an observable sequence that satisfies the
    condition in the predicate if specified, else the last element.

    Example:
    res = source.last()
    res = source.last(lambda x: x > 3)

    Keyword arguments:
    predicate -- {Function} [Optional] A predicate function to evaluate for
        elements in the source sequence.

    Returns {Observable} Sequence containing the last element in the
    observable sequence that satisfies the condition in the predicate.
--------------------------------------------------------------------------------

   2.5     M New subscription on stream 276538821
   3.2     M [next]    0.6: 2
   3.4     M [cmpl]    0.8: fin


========== last_or_default ==========

module rx.linq.observable.lastordefault
@extensionmethod(ObservableBase)
def last_or_default(self, predicate=None, default_value=None):
    Return last or default element.

    Returns the last element of an observable sequence that satisfies
    the condition in the predicate, or a default value if no such
    element exists.

    Examples:
    res = source.last_or_default()
    res = source.last_or_default(lambda x: x > 3)
    res = source.last_or_default(lambda x: x > 3, 0)
    res = source.last_or_default(None, 0)

    predicate -- {Function} [Optional] A predicate function to evaluate
        for elements in the source sequence.
    default_value -- [Optional] The default value if no such element
        exists. If not specified, defaults to None.

    Returns {Observable} Sequence containing the last element in the
    observable sequence that satisfies the condition in the predicate,
    or a default value if no such element exists.
--------------------------------------------------------------------------------

   1.4     M New subscription on stream 276540749
   2.0     M [next]    0.5: None
   2.1     M [cmpl]    0.6: fin

   2.7     M New subscription on stream 276540757
   3.3     M [next]    0.5: 42
   3.4     M [cmpl]    0.6: fin


========== take_last ==========

module rx.linq.observable.takelast
@extensionmethod(ObservableBase)
def take_last(self, count):
    Returns a specified number of contiguous elements from the end of an
    observable sequence.

    Example:
    res = source.take_last(5)

    Description:
    This operator accumulates a buffer with a length enough to store
    elements count elements. Upon completion of the source sequence, this
    buffer is drained on the result sequence. This causes the elements to be
    delayed.

    Keyword arguments:
    :param int count: Number of elements to take from the end of the source
        sequence.

    :returns: An observable sequence containing the specified number of elements 
        from the end of the source sequence.
    :rtype: Observable
--------------------------------------------------------------------------------

   1.7     M New subscription on stream 276541765
   2.4     M [next]    0.6: 3
   2.5     M [next]    0.7: 4
   2.6     M [cmpl]    0.8: fin

... that is, only item n element_at, element_at_or_default


In [30]:
rst(O.element_at)
d = subs(O.from_((1, 2, 3, 4)).element_at(2))
rst(O.element_at_or_default)
d = subs(O.from_((1, 2, 3, 4)).element_at_or_default(6, '42'))



========== element_at ==========

module rx.linq.observable.elementat
@extensionmethod(ObservableBase)
def element_at(self, index):
    Returns the element at a specified index in a sequence.

    Example:
    res = source.element_at(5)

    Keyword arguments:
    :param int index: The zero-based index of the element to retrieve.

    :returns: An observable  sequence that produces the element at the
    specified position in the source sequence.
    :rtype: Observable
--------------------------------------------------------------------------------

   2.2     M New subscription on stream 278634469
   2.8     M [next]    0.5: 3
   2.9     M [cmpl]    0.6: fin


========== element_at_or_default ==========

module rx.linq.observable.elementatordefault
@extensionmethod(ObservableBase)
def element_at_or_default(self, index, default_value=None):
    Returns the element at a specified index in a sequence or a default
    value if the index is out of range.

    Example:
    res = source.element_at_or_default(5)
    res = source.element_at_or_default(5, 0)

    Keyword arguments:
    index -- {Number} The zero-based index of the element to retrieve.
    default_value -- [Optional] The default value if the index is outside
        the bounds of the source sequence.

    Returns an observable {Observable} sequence that produces the element at
        the specified position in the source sequence, or a default value if
        the index is outside the bounds of the source sequence.
--------------------------------------------------------------------------------

   1.3     M New subscription on stream 276533629
   2.1     M [next]    0.6: 42
   2.2     M [cmpl]    0.7: fin

... that is, only those items after the first items

... that is, after the first n items skip, skip_with_time


In [31]:
rst(O.skip, title=True)
d = subs(O.range(0, 5).skip(2))
rst(O.skip_with_time, title=True)
d = subs(marble_stream('1-2-3-4-5-6').skip_with_time(200))



========== skip ==========

module rx.linq.observable.skip
@extensionmethod(ObservableBase)
def skip(self, count):
    Bypasses a specified number of elements in an observable sequence
    and then returns the remaining elements.

    Keyword arguments:
    count -- The number of elements to skip before returning the remaining
        elements.

    Returns an observable sequence that contains the elements that occur
    after the specified index in the input sequence.
--------------------------------------------------------------------------------

   2.5     M New subscription on stream 276082845
   3.1     M [next]    0.5: 2
   3.3     M [next]    0.7: 3
   3.7     M [next]    1.1: 4
   3.9     M [cmpl]    1.3: fin


========== skip_with_time ==========

module rx.linq.observable.skipwithtime
@extensionmethod(ObservableBase)
def skip_with_time(self, duration, scheduler=None):
    Skips elements for the specified duration from the start of the
    observable source sequence, using the specified scheduler to run timers.

    Example:
    1 - res = source.skip_with_time(5000, [optional scheduler])

    Description:
    Specifying a zero value for duration doesn't guarantee no elements will
    be dropped from the start of the source sequence. This is a side-effect
    of the asynchrony introduced by the scheduler, where the action that
    causes callbacks from the source sequence to be forwarded may not
    execute immediately, despite the zero due time.

    Errors produced by the source sequence are always forwarded to the
    result sequence, even if the error occurs before the duration.

    Keyword arguments:
    duration -- {Number} Duration for skipping elements from the start of
        the sequence.
    scheduler -- {Scheduler} Scheduler to run the timer on. If not
        specified, defaults to Rx.Scheduler.timeout.

    Returns n observable {Observable} sequence with the elements skipped
    during the specified duration from the start of the source sequence.
--------------------------------------------------------------------------------

   1.7     M New subscription on stream 276541889
 236.1  T165 [next]  234.4: 3
 347.9  T167 [next]  346.1: 4
 455.7  T168 [next]  453.9: 5
 565.7  T170 [next]  563.9: 6
 566.5  T172 [cmpl]  564.7: fin

... that is, until one of those items matches a predicate skip_while


In [16]:
rst(O.skip_while)
# skipping only AS LONG AS the function is true. If already false at the beginning -> all flushed:
d = subs(O.from_((1, 2, 3, 4, 5, 6)).skip_while(lambda x: x in (1, 2)))



========== skip_while ==========

module rx.linq.observable.skipwhile
@extensionmethod(ObservableBase)
def skip_while(self, predicate):
    Bypasses elements in an observable sequence as long as a specified
    condition is true and then returns the remaining elements. The
    element's index is used in the logic of the predicate function.

    1 - source.skip_while(lambda value: value < 10)
    2 - source.skip_while(lambda value, index: value < 10 or index < 10)

    predicate -- A function to test each element for a condition; the
        second parameter of the function represents the index of the
        source element.

    Returns an observable sequence that contains the elements from the
    input sequence starting at the first element in the linear series that
    does not pass the test specified by predicate.
--------------------------------------------------------------------------------

   1.7     M New subscription on stream 275707413
   2.2     M [next]    0.5: 3
   2.5     M [next]    0.8: 4
   2.8     M [next]    1.1: 5
   3.1     M [next]    1.4: 6
   3.4     M [cmpl]    1.6: fin

... that is, after a second Observable emits an item skip_until, skip_until_with_time


In [3]:
rst(O.skip_until)
s1 = marble_stream('1-2-3-4-5|')
s2 = marble_stream('--2------|')
d = subs(s1.skip_until(s2))
sleep(0.5)
rst(O.skip_until_with_time)
d = subs(s1.skip_until_with_time(300))



========== skip_until ==========

module rx.linq.observable.skipuntil
@extensionmethod(ObservableBase)
def skip_until(self, other):
    Returns the values from the source observable sequence only after
    the other observable sequence produces a value.

    other -- The observable sequence that triggers propagation of elements
        of the source sequence.

    Returns an observable sequence containing the elements of the source
    sequence starting from the point the other sequence triggered
    propagation.
--------------------------------------------------------------------------------

   2.8     M New subscription on stream 276474365
 235.7    T8 [next]  232.9: 3
 346.9   T10 [next]  344.1: 4
 456.9   T12 [next]  454.1: 5
 466.9   T13 [cmpl]  464.1: fin


========== skip_until_with_time ==========

module rx.linq.observable.skipuntilwithtime
@extensionmethod(ObservableBase)
def skip_until_with_time(self, start_time, scheduler=None):
    Skips elements from the observable source sequence until the
    specified start time, using the specified scheduler to run timers.
    Errors produced by the source sequence are always forwarded to the
    result sequence, even if the error occurs before the start time.

    Examples:
    res = source.skip_until_with_time(new Date(), [optional scheduler]);
    res = source.skip_until_with_time(5000, [optional scheduler]);

    Keyword arguments:
    start_time -- Time to start taking elements from the source sequence. If
        this value is less than or equal to Date(), no elements will be
        skipped.
    scheduler -- Scheduler to run the timer on. If not specified, defaults
        to rx.Scheduler.timeout.

    Returns {Observable} An observable sequence with the elements skipped
    until the specified start time.
--------------------------------------------------------------------------------

   3.2     M New subscription on stream 276064481
 348.3   T26 [next]  344.9: 4
 461.7   T28 [next]  458.4: 5
 468.8   T29 [cmpl]  465.5: fin

... that is, those items except the last items

... that is, except the last n items skip_last, skip_last_with_time


In [3]:
rst(O.skip_last)
s1 = marble_stream('1-2-3-4-5|')
s2 = marble_stream('--2------|')
d = subs(s1.skip_last(2))
sleep(0.5)
rst(O.skip_last_with_time)
d = subs(s1.skip_last_with_time(300))



========== skip_last ==========

module rx.linq.observable.skiplast
@extensionmethod(ObservableBase)
def skip_last(self, count):
    Bypasses a specified number of elements at the end of an observable
    sequence.

    Description:
    This operator accumulates a queue with a length enough to store the
    first `count` elements. As more elements are received, elements are
    taken from the front of the queue and produced on the result sequence.
    This causes elements to be delayed.

    Keyword arguments
    count -- Number of elements to bypass at the end of the source sequence.

    Returns an observable {Observable} sequence containing the source
    sequence elements except for the bypassed ones at the end.
--------------------------------------------------------------------------------

   2.6     M New subscription on stream 276525625
 234.8   T31 [next]  232.2: 1
 345.4   T33 [next]  342.8: 2
 455.7   T36 [next]  453.0: 3
 469.9   T37 [cmpl]  467.3: fin


========== skip_last_with_time ==========

module rx.linq.observable.skiplastwithtime
@extensionmethod(ObservableBase)
def skip_last_with_time(self, duration, scheduler=None):
    Skips elements for the specified duration from the end of the
    observable source sequence, using the specified scheduler to run timers.

    1 - res = source.skip_last_with_time(5000)
    2 - res = source.skip_last_with_time(5000, scheduler)

    Description:
    This operator accumulates a queue with a length enough to store elements
    received during the initial duration window. As more elements are
    received, elements older than the specified duration are taken from the
    queue and produced on the result sequence. This causes elements to be
    delayed with duration.

    Keyword arguments:
    duration -- {Number} Duration for skipping elements from the end of the
        sequence.
    scheduler -- {Scheduler} [Optional]  Scheduler to run the timer on. If
        not specified, defaults to Rx.Scheduler.timeout
    Returns an observable {Observable} sequence with the elements skipped
    during the specified duration from the end of the source sequence.
--------------------------------------------------------------------------------

   3.9     M New subscription on stream 276128193
 348.3   T46 [next]  344.2: 1
 459.2   T47 [next]  455.2: 2
 471.7   T50 [cmpl]  467.7: fin

... that is, until one of those items matches a predicate take_while


In [6]:
rst(O.take_while)
d = subs(O.from_((1, 2, 3)).take_while(lambda x: x<3))



========== take_while ==========

module rx.linq.observable.takewhile
@extensionmethod(ObservableBase)
def take_while(self, predicate):
    Returns elements from an observable sequence as long as a specified
    condition is true. The element's index is used in the logic of the
    predicate function.

    1 - source.take_while(lambda value: value < 10)
    2 - source.take_while(lambda value, index: value < 10 or index < 10)

    Keyword arguments:
    predicate -- A function to test each element for a condition; the
        second parameter of the function represents the index of the source
        element.

    Returns an observable sequence that contains the elements from the
    input sequence that occur before the element at which the test no
    longer passes.
--------------------------------------------------------------------------------

   2.6     M New subscription on stream 278900289
   2.9     M [next]    0.3: 1
   3.5     M [next]    0.8: 2
   4.0     M [cmpl]    1.4: fin

...that is, except items emitted during a period of time before the source completes skip_last, skip_last_with_time


In [8]:
# (see above)

...that is, except items emitted after a second Observable emits an item take_until, take_until_with_time


In [12]:
rst(O.take_until)
s1 = marble_stream('1-2-3-4-5|')
s2 = marble_stream('--2------|')
d = subs(s1.take_until(s2))
sleep(0.5)
rst(O.take_until_with_time)
d = subs(s1.take_until_with_time(300))



========== take_until ==========

module rx.linq.observable.takeuntil
@extensionmethod(ObservableBase)
def take_until(self, other):
    Returns the values from the source observable sequence until the
    other observable sequence produces a value.

    Keyword arguments:
    other -- Observable sequence that terminates propagation of elements of
        the source sequence.

    Returns an observable sequence containing the elements of the source
    sequence up to the point the other sequence interrupted further
    propagation.
--------------------------------------------------------------------------------

   2.1     M New subscription on stream 278898461
  13.8  T131 [next]   11.6: 1
 123.9  T133 [next]  121.7: 2
 216.0  T143 [cmpl]  213.8: fin


========== take_until_with_time ==========

module rx.linq.observable.takeuntilwithtime
@extensionmethod(ObservableBase)
def take_until_with_time(self, end_time, scheduler=None):
    Takes elements for the specified duration until the specified end
    time, using the specified scheduler to run timers.

    Examples:
    1 - res = source.take_until_with_time(dt, [optional scheduler])
    2 - res = source.take_until_with_time(5000, [optional scheduler])

    Keyword Arguments:
    end_time -- {Number | Date} Time to stop taking elements from the source
        sequence. If this value is less than or equal to Date(), the
        result stream will complete immediately.
    scheduler -- {Scheduler} Scheduler to run the timer on.

    Returns an observable {Observable} sequence with the elements taken
    until the specified end time.
--------------------------------------------------------------------------------

   2.1     M New subscription on stream 278906801
  14.7  T148 [next]   12.5: 1
 128.0  T149 [next]  125.8: 2
 235.5  T152 [next]  233.4: 3
 307.8  T147 [cmpl]  305.7: fin

... by sampling the Observable periodically sample


In [26]:
rst(O.sample)
xs =     marble_stream('1-2-3-4-5-6-7-8-9-1-2-3-4-5-6-E|')
sampler =marble_stream('---1---1----------1------------|')
d = subs(xs.sample(300))
sleep(2)
d = subs(xs.sample(sampler=sampler))



========== sample ==========

module rx.linq.observable.sample
@extensionmethod(Observable, alias="throttle_last")
def sample(self, interval=None, sampler=None, scheduler=None):
    Samples the observable sequence at each interval.

    1 - res = source.sample(sample_observable) # Sampler tick sequence
    2 - res = source.sample(5000) # 5 seconds
    2 - res = source.sample(5000, rx.scheduler.timeout) # 5 seconds

    Keyword arguments:
    source -- Source sequence to sample.
    interval -- Interval at which to sample (specified as an integer
        denoting milliseconds).
    scheduler -- [Optional] Scheduler to run the sampling timer on. If not
        specified, the timeout scheduler is used.

    Returns sampled observable sequence.
--------------------------------------------------------------------------------

   1.9     M New subscription on stream 288677153
 314.2  T847 [next]  312.2: 3
 619.4  T849 [next]  617.4: 6
 921.6  T850 [next]  919.6: 9
1226.2  T851 [next] 1224.1: 2
1530.3  T852 [next] 1528.2: 5
1836.2  T853 [next] 1834.1: E
1836.9  T853 [cmpl] 1834.9: fin

2018.3     M New subscription on stream 279189229
2338.6  T889 [next]  320.3: 3
2651.7  T890 [next]  633.4: 6
3661.6  T893 [next] 1643.3: 6
4872.2  T894 [next] 2853.8: E
4872.5  T894 [cmpl] 2854.2: fin

... by only emitting items that are not followed by other items within some duration debounce


In [30]:
rst(O.debounce)
s = marble_stream('-12-3-4--5--6---7---8----9----a')
print('flushing a value every >= 300ms')
d = subs(s.debounce(300))



========== debounce ==========

module rx.linq.observable.debounce
@extensionmethod(Observable, alias="throttle_with_timeout")
def debounce(self, duetime, scheduler=None):
    Ignores values from an observable sequence which are followed by
    another value before duetime.

    Example:
    1 - res = source.debounce(5000) # 5 seconds
    2 - res = source.debounce(5000, scheduler)

    Keyword arguments:
    duetime -- {Number} Duration of the throttle period for each value
        (specified as an integer denoting milliseconds).
    scheduler -- {Scheduler} [Optional]  Scheduler to run the throttle
        timers on. If not specified, the timeout scheduler is used.

    Returns {Observable} The debounced sequence.
--------------------------------------------------------------------------------
flushing a value every >= 300ms

   3.4     M New subscription on stream 281048569
1062.2  T977 [next] 1058.8: 6
1368.6  T978 [next] 1365.2: 7
1685.5  T979 [next] 1682.1: 8
2092.7  T980 [next] 2089.4: 9
2203.3  T970 [cmpl] 2199.9: fin

... by suppressing items that are duplicates of already-emitted items distinct


In [43]:
rst(O.distinct)
s = O.from_((1, 2, 1, 1, 3))
d = subs(s.distinct(lambda x: x*2))
d = subs(s.distinct(lambda x: x, lambda a, b: a==2))



========== distinct ==========

module rx.linq.observable.distinct
@extensionmethod(ObservableBase)
def distinct(self, key_mapper=None, comparer=None):
    Returns an observable sequence that contains only distinct elements
    according to the key_mapper and the comparer. Usage of this operator
    should be considered carefully due to the maintenance of an internal
    lookup structure which can grow large.

    Example:
    res = obs = xs.distinct()
    obs = xs.distinct(lambda x: x.id)
    obs = xs.distinct(lambda x: x.id, lambda a,b: a == b)

    Keyword arguments:
    key_mapper -- {Function} [Optional]  A function to compute the
        comparison key for each element.
    comparer -- {Function} [Optional]  Used to compare items in the
        collection.

    Returns an observable {Observable} sequence only containing the distinct
    elements, based on a computed key value, from the source sequence.
--------------------------------------------------------------------------------

   2.6     M New subscription on stream 286822853
   3.0     M [next]    0.3: 1
   3.2     M [next]    0.5: 2
   3.5     M [next]    0.9: 3
   3.8     M [cmpl]    1.1: fin

   4.3     M New subscription on stream 286822721
   4.6     M [next]    0.2: 1
   4.9     M [next]    0.5: 2
   5.4     M [cmpl]    1.0: fin

... if they immediately follow the item they are duplicates of distinct_until_changed


In [45]:
rst(O.distinct_until_changed)
s = O.from_((1, 2, 1, 1, 3))
d = subs(s.distinct_until_changed(lambda x: x*2))
d = subs(s.distinct_until_changed(lambda x: x, lambda a, b: a==2))



========== distinct_until_changed ==========

module rx.linq.observable.distinctuntilchanged
@extensionmethod(ObservableBase)
def distinct_until_changed(self, key_mapper=None, comparer=None):
    Returns an observable sequence that contains only distinct
    contiguous elements according to the key_mapper and the comparer.

    1 - obs = observable.distinct_until_changed();
    2 - obs = observable.distinct_until_changed(lambda x: x.id)
    3 - obs = observable.distinct_until_changed(lambda x: x.id,
                                                lambda x, y: x == y)

    key_mapper -- [Optional] A function to compute the comparison key for
        each element. If not provided, it projects the value.
    comparer -- [Optional] Equality comparer for computed key values. If
        not provided, defaults to an equality comparer function.

    Return An observable sequence only containing the distinct contiguous
    elements, based on a computed key value, from the source sequence.
--------------------------------------------------------------------------------

   3.2     M New subscription on stream 281042609
   4.1     M [next]    0.8: 1
   5.0     M [next]    1.7: 2
   5.4     M [next]    2.1: 1
   5.9     M [next]    2.6: 3
   6.2     M [cmpl]    2.9: fin

   7.0     M New subscription on stream 286831301
   7.5     M [next]    0.3: 1
   7.8     M [next]    0.7: 2
   8.2     M [cmpl]    1.0: fin

... by delaying my subscription to it for some time after it begins emitting items delay_subscription


In [10]:
rst(O.delay)
header("note the absolute time of emissions:")
d = subs(O.range(0, 10).delay(1000))



========== delay ==========

module rx.linq.observable.delay
@extensionmethod(ObservableBase)
def delay(self, duetime, scheduler=None):
    Time shifts the observable sequence by duetime. The relative time
    intervals between the values are preserved.

    1 - res = rx.Observable.delay(datetime())
    2 - res = rx.Observable.delay(datetime(), Scheduler.timeout)

    3 - res = rx.Observable.delay(5000)
    4 - res = rx.Observable.delay(5000, Scheduler.timeout)

    Keyword arguments:
    :param datetime|int duetime: Absolute (specified as a datetime object) or
        relative time (specified as an integer denoting milliseconds) by which
        to shift the observable sequence.
    :param Scheduler scheduler: [Optional] Scheduler to run the delay timers on.
        If not specified, the timeout scheduler is used.

    :returns: Time-shifted sequence.
    :rtype: Observable
--------------------------------------------------------------------------------


========== note the absolute time of emissions: ==========


   2.1     M New subscription on stream 276542741
1005.6   T11 [next] 1003.4: 0
1005.9   T11 [next] 1003.7: 1
1006.1   T11 [next] 1003.9: 2
1006.2   T11 [next] 1003.9: 3
1006.2   T11 [next] 1004.0: 4
1006.3   T11 [next] 1004.0: 5
1006.3   T11 [next] 1004.1: 6
1006.4   T11 [next] 1004.1: 7
1006.4   T11 [next] 1004.1: 8
1006.4   T11 [next] 1004.2: 9
1006.5   T11 [cmpl] 1004.2: fin

I want to reemit items from an Observable only on condition that it was the first of a collection of Observables to emit an item amb


In [18]:
rst(O.amb)
s1 = O.range(0, 5).delay(100)
s2 = O.range(10, 5)

d = subs(O.amb(s1, s2))



========== amb ==========

module rx.linq.observable.amb
@extensionclassmethod(Observable)
def amb(cls, *args):
    Propagates the observable sequence that reacts first.

    E.g. winner = rx.Observable.amb(xs, ys, zs)

    Returns an observable sequence that surfaces any of the given sequences,
    whichever reacted first.
--------------------------------------------------------------------------------

   3.4     M New subscription on stream 276511041
   6.4     M [next]    2.8: 10
   6.7     M [next]    3.0: 11
   7.0     M [next]    3.4: 12
   7.5     M [next]    3.9: 13
   7.9     M [next]    4.2: 14
   8.2     M [cmpl]    4.6: fin

In [ ]: